#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/../common/pywrap", sys.argv)

# This file is part of Cockpit.
#
# Copyright (C) 2023 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <https://www.gnu.org/licenses/>.

import testlib


# enable this once our cockpit/ws container can beiboot
@testlib.skipWsContainer("client setup does not work with ws container")
class TestClient(testlib.MachineCase):

    provision = {
        "client": {"address": "10.111.113.1/24", "memory_mb": 660},
        "target": {"address": "10.111.113.2/24", "memory_mb": 660},
    }

    def setUp(self):
        super().setUp()
        self.m_target = self.machines["target"]
        self.m_client = self.machines["client"]
        self.m_target.execute("hostnamectl set-hostname target")
        # validate on-demand install: this does not work on arch, non-split package
        if self.m_target.image.startswith("debian") or self.m_target.image.startswith("ubuntu"):
            self.m_target.execute("dpkg --purge pcp")
        elif self.m_target.image == 'arch':
            self.m_target.execute("pacman -Rdd --noconfirm pcp")
        else:
            self.m_target.execute("""
                rpm --erase --verbose pcp python3-pcp
                systemctl daemon-reload
            """)

        # replicate the plumbing bits of src/client/cockpit-client to set up cockpit-beiboot
        self.m_client.write("/etc/cockpit/cockpit.conf", """
[WebService]
X-For-CockpitClient = true
LoginTo = true

[Ssh-Login]
Command = /usr/bin/env python3 -m cockpit.beiboot
""")
        # use the connect_from_flatpak() code path of beiboot.py
        self.m_client.write("/.flatpak-info", "test mock")
        self.m_client.write("/usr/local/bin/flatpak-spawn", """#!/bin/sh -eux
if [ "$1" = "--host" ]; then shift; continue; fi
while [ "${1#--env=}" != "$1" ]; do
    export "${1#--env=}"
    shift
    continue
done
exec "$@"
""", perm="0755")

    def check_login(self, expected_user: str) -> None:
        b = self.browser
        b.wait_visible('#content')
        b.wait_in_text("#hosts-sel", expected_user)
        b.enter_page("/system")
        if self.is_devel_build():
            b.wait_visible("#system_information_hostname_text")
            # devel disables preloads
        else:
            b.wait_visible("#page_status_notification_updates")
        b.switch_to_top()

    def logout(self, check_last_host: str | None = None) -> None:
        b = self.browser

        b.logout()
        # FIXME: This is broken, nothing appears
        # b.wait_text("#brand", "Connect to:")
        if check_last_host:
            b.wait_val("#server-field", check_last_host)

        # no leaked processes
        self.m_client.execute('''
            while [ -n "$(pgrep -au admin | grep -Ev 'cockpit-ws' >&2)" ]; do sleep 1; done
        ''', timeout=10)
        self.m_target.execute("while pgrep -af '([c]ockpit|[s]sh-agent)' >&2; do sleep 1; done",
                              timeout=30)

    def testBeibootNoBridge(self):
        # set up target machine: no cockpit
        self.m_target.execute("rm /usr/bin/cockpit-bridge; rm -r /usr/share/cockpit")
        self.checkLoginScenarios()

    def testBeibootWithBridge(self):
        self.checkLoginScenarios()

    def checkLoginScenarios(self):
        self.m_client.spawn(f"runuser -u admin -- {self.libexecdir}/cockpit-ws --no-tls", "ws.log")
        self.m_client.wait_for_cockpit_running()

        b = self.browser
        b.open("/")

        # same username + password login, unknown host key
        b.wait_text("#brand", "Connect to:")
        b.wait_not_visible("#recent-hosts-list")
        b.set_val("#server-field", "10.111.113.2")
        b.click("#login-button")
        # accept unknown host key
        b.wait_in_text("#hostkey-message-1", "10.111.113.2")
        b.wait_in_text("#hostkey-fingerprint", "SHA256:")
        b.click("#login-button")
        b.wait_text("#conversation-prompt", "admin@10.111.113.2's password: ")
        b.set_val("#conversation-input", "foobar")
        b.click("#login-button")
        self.check_login("admin@target")
        b.wait_in_text("#host-apps", "Services")
        b.wait_in_text("#host-apps", "Terminal")
        b.become_superuser()
        b.drop_superuser()
        self.logout(check_last_host="10.111.113.2")

        # remembers the last host it connected to
        b.wait_in_text("#recent-hosts-list", "10.111.113.2")

        # same username + password login, now host is known
        b.click("#login-button")
        b.wait_text("#conversation-prompt", "admin@10.111.113.2's password: ")
        b.set_val("#conversation-input", "foobar")
        b.click("#login-button")
        self.check_login("admin@target")
        b.become_superuser()

        # python3-pcp is installed on client, but not on target; recognize that
        b.go("/metrics")
        b.enter_page("/metrics")
        b.wait_in_text(".pf-v6-c-empty-state", "PCP is missing")
        # on-demand install is allowed
        b.wait_in_text(".pf-v6-c-empty-state button.pf-m-primary", "Install PCP support")
        b.click(".pf-v6-c-empty-state button.pf-m-primary")
        b.wait_in_text(".pf-v6-c-modal-box", "pcp will be installed")
        b.click(".pf-v6-c-modal-box button.cancel")

        b.drop_superuser()
        self.logout()

        # wrong password, SSH gives you three attempts
        b.click("#login-button")
        for _ in range(3):
            b.wait_text("#conversation-prompt", "admin@10.111.113.2's password: ")
            b.set_val("#conversation-input", "wrong")
            b.click("#login-button")
        b.wait_in_text("#login-error-title", "Authentication failed")
        b.wait_val("#server-field", "10.111.113.2")

        # connect to most recent host
        b.open("/")  # reset URL from /metrics and last remote =host
        b.click("#recent-hosts-list .host-line button.host-name")
        b.wait_text("#conversation-prompt", "admin@10.111.113.2's password: ")
        b.set_val("#conversation-input", "foobar")
        b.click("#login-button")
        self.check_login("admin@target")
        self.logout()

        # different user name + password
        self.m_target.execute("useradd -s /bin/bash user; echo user:barfoo | chpasswd")
        b.set_val("#server-field", "user@10.111.113.2")
        b.click("#login-button")
        b.wait_text("#conversation-prompt", "user@10.111.113.2's password: ")
        b.set_val("#conversation-input", "barfoo")
        b.click("#login-button")
        self.check_login("user@target")

        # not a sudoer
        b.open_superuser_dialog()
        b.set_input_text(".pf-v6-c-modal-box:contains('Switch to administrative access') input", "barfoo")
        b.click(".pf-v6-c-modal-box button:contains('Authenticate')")
        b.click(".pf-v6-c-modal-box:contains('Problem becoming administrator') button:contains('Close')")
        b.wait_not_present(".pf-v6-c-modal-box")
        b.check_superuser_indicator("Limited access")

        self.logout()
        b.wait_in_text("#recent-hosts-list", "user@10.111.113.2")

        # explicit port
        b.set_val("#server-field", "10.111.113.2:22")
        b.click("#login-button")
        b.wait_text("#conversation-prompt", "admin@10.111.113.2's password: ")
        b.set_val("#conversation-input", "foobar")
        b.click("#login-button")
        self.check_login("admin@target")
        self.logout()
        b.wait_in_text("#recent-hosts-list", "10.111.113.2:22")

        # different user name + explicit port
        b.set_val("#server-field", "user@10.111.113.2:22")
        b.click("#login-button")
        b.wait_text("#conversation-prompt", "user@10.111.113.2's password: ")
        b.set_val("#conversation-input", "barfoo")
        b.click("#login-button")
        self.check_login("user@target")
        self.logout()
        b.wait_in_text("#recent-hosts-list", "user@10.111.113.2:22")

        # remove that recent hosts entry, this also avoids ambiguous selectors further down
        b.click("#recent-hosts-list .host-line:contains('user@10.111.113.2:22') button.host-remove")
        b.wait_not_in_text("#recent-hosts-list", "user@10.111.113.2:22")

        # unreachable host
        b.set_val("#server-field", "unknownhost")
        b.click("#login-button")
        b.wait_text("#login-error-title", "Refusing to connect")
        b.wait_text("#login-error-message", "Host is unknown")
        b.wait_val("#server-field", "unknownhost")
        # does not appear in recent hosts
        b.wait_in_text("#recent-hosts-list", "10.111.113.2")
        self.assertNotIn("unknownhost", b.text("#recent-hosts-list"))

        # wrong port
        b.set_val("#server-field", "10.111.113.2:222")
        b.click("#login-button")
        b.wait_in_text("#login-error-message", "Host is unknown")

        # unencrypted SSH key login
        self.m_client.execute("runuser -u admin -- ssh-keygen -t rsa -N '' -f ~admin/.ssh/id_rsa")
        pubkey = self.m_client.execute("cat ~admin/.ssh/id_rsa.pub")
        self.m_target.write("/home/user/.ssh/authorized_keys", pubkey, owner="user:user", perm="600")
        b.click("#recent-hosts-list .host-line:contains('user@10.111.113.2') button.host-name")
        self.check_login("user@target")
        self.logout()

        # encrypted SSH key login
        self.m_client.execute("runuser -u admin -- ssh-keygen -f ~admin/.ssh/id_rsa -p -P '' -N foobarfoo")
        b.click("#login-button")
        b.wait_text("#conversation-prompt", "Enter passphrase for key '/home/admin/.ssh/id_rsa': ")
        b.set_val("#conversation-input", "wrong")
        b.click("#login-button")
        b.wait_val("#conversation-input", "")
        b.wait_text("#conversation-prompt", "Enter passphrase for key '/home/admin/.ssh/id_rsa': ")
        b.set_val("#conversation-input", "foobarfoo")
        b.click("#login-button")
        self.check_login("user@target")
        self.logout()


if __name__ == '__main__':
    testlib.test_main()
